sinks: walk the input arrangement via cursor per batch#36165
sinks: walk the input arrangement via cursor per batch#36165DAlperin merged 5 commits intoMaterializeInc:mainfrom
Conversation
264cfea to
36243b0
Compare
Extracts the batch-level cursor walking logic from combine_at_timestamp into a standalone helper that yields DiffPairs one at a time via callback. This lets sinks consume an arrangement directly inside their own operator without materializing a Vec<DiffPair> per (key, timestamp) group. combine_at_timestamp is left in place; subsequent commits will migrate the Kafka and Iceberg sinks to consume arrangements via the new helper.
c94fb74 to
bc5c160
Compare
def-
left a comment
There was a problem hiding this comment.
When a Kafka or Iceberg sink is configured with a non-unique key (KEY (...) NOT ENFORCED), Materialize is supposed to log a warning ("primary key error") whenever multiple rows share the same key at the same timestamp. With this PR the warning doesn't appear when no new data arrives afterwards. Not sure how bad that is.
Reproducer:
diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py
index ac2b7c3e92..5a9bbeb7ee 100644
--- a/test/source-sink-errors/mzcompose.py
+++ b/test/source-sink-errors/mzcompose.py
@@ -13,6 +13,7 @@ Disruption and then checking the mz_internal.mz_*_statuses tables
"""
import random
+import time
from collections.abc import Callable
from dataclasses import dataclass
from textwrap import dedent
@@ -561,3 +562,52 @@ def unsupported_pg_table(c: Composition) -> None:
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO source1 VALUES (3, '[2:3]={2,2}')
"""))
+
+
+def workflow_test_pk_violation_warning(c: Composition) -> None:
+ seed = random.randint(0, 256**4)
+ c.up("redpanda", "materialized", Service("testdrive", idle=True))
+
+ with c.override(
+ Testdrive(
+ no_reset=True,
+ seed=seed,
+ entrypoint_extra=["--initial-backoff=1s", "--backoff-factor=0"],
+ )
+ ):
+ c.testdrive(dedent("""
+ > CREATE CONNECTION kafka_conn
+ TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
+
+ > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
+ URL '${testdrive.schema-registry-url}'
+ );
+
+ > CREATE TABLE t (a INT, b INT);
+
+ > CREATE SINK pk_test_sink FROM t
+ INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-pk-test-${testdrive.seed}')
+ KEY (a) NOT ENFORCED
+ FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+ ENVELOPE UPSERT;
+
+ > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'pk_test_sink'
+ running
+ """))
+
+ # PkViolationWarner rate-limits to once per 10s from construction.
+ time.sleep(12)
+
+ c.testdrive(dedent("""
+ > INSERT INTO t VALUES (1, 10), (1, 20);
+
+ $ set-sql-timeout duration=60s
+ $ kafka-verify-topic sink=materialize.public.pk_test_sink await-value-schema=true
+ """))
+
+ time.sleep(5)
+
+ logs = c.invoke("logs", "materialized", capture=True)
+ assert "primary key error" in logs.stdout, (
+ "PkViolationWarner::flush() not called after batch"
+ )Replaces the pre-grouped VecCollection<(Option<Row>, DiffPair<Row>)> that render_sink previously handed sinks with an Arranged<SinkTrace> — each sink now walks cursors itself via for_each_diff_pair. - The shared key-extraction + arrangement lives in render_sink on sinks.rs; zip_into_diff_pairs and combine_at_timestamp are gone. - Kafka's encode_collection walks the arrangement inside its async operator, emitting a KafkaMessage per DiffPair. - Iceberg gains a small walker operator at its input that walks the arrangement into the same (Option<Row>, DiffPair<Row>) stream shape its mint / write / commit pipeline already consumed. - Per-(key, timestamp) primary-key-violation detection is lifted into a new PkViolationWarner in sinks.rs, observed incrementally as each sink walks the arrangement instead of after a Vec<DiffPair> materialization. The immediate win is skipping the Vec<DiffPair> allocation per (key, time) in combine_at_timestamp; the larger architectural win is that sinks can now make envelope- and batch-specific decisions about cursor navigation (e.g., snapshot fast paths) without touching shared plumbing.
SinkRender::render_sink now takes a Stream<Vec<SinkBatch>> instead of an Arranged<SinkTrace>. Sinks only ever walk incoming batches (via for_each_diff_pair) — they never use the TraceAgent for random access — so the reader handle is dead weight. arrange_sink_input still calls arrange_named, but immediately extracts arranged.stream and lets the surrounding Arranged (and its TraceAgent) drop. With no reader holding compaction frontiers, the arrange operator's spine can compact to the empty antichain as batches flow, releasing historical batch state instead of accumulating it. Mirrors the pattern used by DD's consolidate_named, which builds an arrangement only to call as_collection on it and drop the trace.
IcebergSink measures steady-state streaming (1 seed row in before() then INSERT of 1M rows inside benchmark()), which exercises incremental emission but not the snapshot path where the sink reads a pre-existing source from as_of. IcebergSnapshot mirrors ExactlyOnce's shape: init() pre-populates a table with 1M rows, before() generates a unique Iceberg destination per run, and benchmark() creates a fresh sink and measures until the full snapshot is committed. This is the path where the zip_into_diff_pairs allocation hotspot lived, and therefore the relevant micro-benchmark for the sink refactor.
bc5c160 to
46f9ca4
Compare
martykulma
left a comment
There was a problem hiding this comment.
looks great @DAlperin - minor comments, nothing blocking.
| // just needed a value to use to distribute work. | ||
| if user_key_indices.is_none() { | ||
| collection = collection.map(|(_key, value)| (None, value)) | ||
| let arranged = keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink"); |
There was a problem hiding this comment.
nit: more explicit RE: dropping the trace
let Arranged{ stream, trace: _ } = ...;
stream
There was a problem hiding this comment.
curious if the trace would be optimized out by the compiler in this current version, or would this nit possibly provide some perf benefit?
agreed with the nit in terms of readability, though
There was a problem hiding this comment.
You should make sure to drop the trace, otherwise you risk never releasing arrangement memory. A handle to a trace needs to be downgraded to release resources.
There was a problem hiding this comment.
I updated the code to make the drop clearer, but this is whats happening, no? The trace goes out of scope and is dropped. TraceAgent::drop then calls downgrade before the RC falls.
There was a problem hiding this comment.
(My comment was less about the implementation and rather what should be true :) I've had some surprises in the past when things were accidentally moved into closures and whatnot.)
| if self.count > 1 { | ||
| let now = Instant::now(); | ||
| if now.duration_since(self.last_warning) >= Duration::from_secs(10) { | ||
| self.last_warning = now; |
There was a problem hiding this comment.
if the snapshot is quick (<10s), this won't emit a warning. It looks like the previous one didn't either, so possibly not a big deal.
There was a problem hiding this comment.
Yeah, stole from old implementation, not that worried about if for now
| let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| { | ||
| if cnt == 1 { | ||
| Either::Left(iter::once((t, v))) | ||
| } else { | ||
| Either::Right(iter::repeat((t, v)).take(cnt)) | ||
| } |
There was a problem hiding this comment.
should do the same
std::iter::repeat_n((t,v), cnt)
a0c2078 to
bda54df
Compare
| let keyed = match key_indices { | ||
| None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)), | ||
| Some(key_indices) => { | ||
| let mut datum_vec = mz_repr::DatumVec::new(); |
There was a problem hiding this comment.
this is obviously unrelated to your changes, but im curious - why is key a whole Row of its own that needs to be packed? Do the sinks represent the key as a sort of struct column that needs to be built from the Datums in each of the key indices? It feels like it would make more sense to me to merge all the key Datums into one combined key Datum rather than its own row? I suppose we can't really precompute a key column because its entirely dependent on the sink, different sinks for the same persist collection could want different, arbitrary keys, but the TODO[perf] had me wondering
Summary
Refactor the sink rendering path so sinks walk their input arrangement's cursors directly, eliminating the
Vec<DiffPair<Row>>-per-group materialization and the trace-reader overhead thatzip_into_diff_pairs→combine_at_timestamp→ flat_map previously incurred. Motivated by profiles of very large snapshot sinks where that pipeline dominated allocation and caused major page faults.What this buys
(key, time)Vec<DiffPair<Row>>allocation. Biggest direct win — this was the dominant allocator inzip_into_diff_pairson the profile.(key, time)group.combine_at_timestamp→ flat_map → encode). Iceberg: one fewer hop too.TraceAgentlets spine compaction advance its frontier, releasing historical batch state rather than accumulating it.What this does not fix
Vec/operator overhead, not from the spine itself shrinking.Follow-ups (not in this PR) worth considering:
Test plan
cargo check --workspace --all-targetscargo clippy -p mz-interchange -p mz-storage --testsbin/lint(check-no-diff fails locally due to jj colocation; all substantive checks pass)cargo test -p mz-interchange --lib envelopes::(7 tests, all pass)🤖 Generated with Claude Code